整合 RocketMQ 使用

第三方组件声明

本文基于 rocketmq-v5-client-spring-boot 进行整合。服务端安装与基础概念请优先参考 RocketMQ 官方文档,Spring Boot 接入与高级能力请优先参考 RocketMQ Spring 官方仓库,本文只做基础整合和基础使用。

环境组件版本备注
RocketMQ5.3.0
PigX5.5
JDK17分支:jdk17

安装 RocketMQ

  • 下载脚本文件
git clone https://github.com/pig-mesh/rocketmq-docker-compose.git
  • 执行启动 RocketMQ
docker compose up -d
  • 创建临时 topic
docker exec -it rmqbroker bash
sh mqadmin updatetopic -t TestTopic -c DefaultCluster

代码整合

  • 目标服务增加 RocketMQ 依赖 jar
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-v5-client-spring-boot</artifactId>
    <version>2.3.1</version>
</dependency>
  • Nacos 对应的服务配置文件增加链接相关信息
rocketmq:
  producer:
    topic: TestTopic
    endpoints: 127.0.0.1:8081
    tag: '*'
  push-consumer:
    endpoints: 127.0.0.1:8081
    tag: '*'
  • 配置队列监听消费者
@Slf4j
@Service
@RocketMQMessageListener(topic = "TestTopic", consumerGroup = "test-group", tag = "*", filterExpressionType = "tag")
public class MyConsumer1 implements RocketMQListener {
    @Override
    public ConsumeResult consume(MessageView messageView) {
        ByteBuffer body = messageView.getBody();
        String message = StandardCharsets.UTF_8.decode(body).toString();
        log.info("received message: {}", message);
        return ConsumeResult.SUCCESS;
    }
}
  • 测试消息发送
@SpringBootTest
class DemoApplicationTests {

    @Autowired
    private RocketMQClientTemplate rocketMQTemplate;

    @Test
    void contextLoads() {
        rocketMQTemplate.convertAndSend("TestTopic", "Hello, World!");
    }
}

特殊说明

  • 在 MQ 消费监听逻辑中调用 Feign , 参考 :pigx token 传递及 feign 调用中的无token 调用

A 服务并没有 token 去请求 B 服务,pigx 也对这种情况进行了兼容。类似于 A 对外暴露 API,但是又安全限制。参考日志插入情况

FeignClient 需要带一个请求 token,FROM_IN 声明是内部调用

remoteLogService.saveLog(sysLog, SecurityConstants.FROM_IN);

目标接口对内外调用进行限制 @Inner 注解,这样就避免接口对外暴露的安全问题。只能通过内部调用才能使用,浏览器不能直接访问该接口

@Inner
@PostMapping
public R save(@Valid @RequestBody SysLog sysLog) {
    return new R<>(sysLogService.save(sysLog));
}
  • 在 MQ 消费监听逻辑中调用数据库查询需要手动指定租户编号查询指定租户的信息,不然均是查询的租户 1
TenantContextHolder.set(id);
  • 在 MQ 消费监听逻辑中不能使用数据权限等